共计 4589 个字符,预计需要花费 12 分钟才能阅读完成。
引入
我们知道一个线程同一时间内只能被操作系统分配一个 CPU 资源, 我们可以基于多进程实现并发, 也可以基于多线程实现并发, CPU正在运行一个任务, 有两种情况下会被切去执行其它任务, 一种是该任务发生了阻塞, 另一是该任务运行时间过长或者被其他优先级更高的任务夺走 CPU, 对于单线程来说, 如果一个单线程下运行了多个任务, 那么就不可避免的出现I/O 操作, 一旦一个任务出现阻塞的情况, 那么整个线程将处于阻塞状态 (因为一旦阻塞, CPU 资源将会被夺走), 但是如果我们能在自己的应用程序中 (即用户级别, 非操作系统级别) 控制单线程下多个任务能在一个任务遇到 I/O 之后立马切换到另一个任务, 那么我们就可以保证该线程能最大限度的保持就绪状态 (也就是随时能进入运行态), 相当于我们在应用程序级别将I/O 操作隐藏起来, 迷惑操作系统, 让其看到的状态就是一直在计算, I/O比较少, 于是操作系统就会将更多的 CPU 资源分配给该线程
一. 什么是协程
1. 回顾并发的本质
- CPU 在多个任务之间来回切换, 并且在切换之前保存好当前的状态
2. 协程是什么
- 基于单线程下实现的并发, 又称为微线程(Coroutine), 它是一种用户态的轻量级线程, 即协程是由应用程序自己控制调度的
- 与进程线程一样, 并不是真实存在的东西, 只是程序员为了方便理解虚拟出来的概念
3. 内核级别和用户级别的调度
- 线程的执行需要被操作系统分配 CPU 资源, 这属于内核级别的调度
- 而单个线程内的多个任务之间的调度是由应用程序自己实现的, 这属于用户级别的调度
4. 内核级别与用户级别在线程中的调度比较
- 优点 :
- 协程的开销更小, 更轻量, 属于应用程序级别的切换, 操作系统完全感觉不到
- 单线程下实现并发效果, 更大限度的利用 CPU 资源 (可以是一个程序开启多个进程, 一个进程开启多个线程, 每个线程开启协程)
- 特点 :
- 自己的应用程序实现的多个任务之间的调度, 遇到 I / O 就切, 可以将单线程的 I / O 降到最低
- 缺点 :
- 协程是在单线程下实现的, 所以它无法利用多核
- 引入协程, 就需要检测该线程下所有的 I / O 行为, 实现遇到 I / O 就切, 少一个都不行, 一旦协程出现了阻塞, 也将会阻塞整个线程
5. 使用 yieldd 模拟协程的效果
- yield 是在生成器那一章学到的知识点, 它可以保存状态, 这与操作系统保存线程的状态很像, 但它是属于代码级别控制的, 更轻量
import time
def Foo():
for i in range(100):
print(f"--->Foo:{i}")
yield # 保存状态
def Bar():
f = Foo()
for i in range(10000000):
i += 1
next(f)
start_time = time.time()
Bar()
stop_time = time.time()
print(f"user time:{stop_time-start_time}")
ps : yield无法检测 I/O, 无法实现遇到I/O 就进行切换
6. 总结协程特点
- 在单线程实现的并发
- 修改共享数据不需要加锁(不会造成同时修改的情况)
- 用户程序里自己保存多个控制流的上下文栈
- 一个协程遇到 I/O 操作会自动切换到其他协程
ps : 如何实现自动检测 I/O, 上面模拟使用的 yield 以及 greenlet 都无法做到, 于是以下就开始介绍 gevent 模块 (select 机制)
二.Gevent 模块介绍
1. 安装 Gevent
🍋"cmd" 或 "pycharm" 的 "Terminal"
pip3 install gevent
2. 什么是 gevent 模块
-
Geven t 是 Python 的第三方库, 它为各种并发和网络相关的任务提供了整洁的 API, 我们可以通过gevent 轻松实现并发同步或异步编程
-
在 gevent 中用到的主要模式是 Greenlet, 它是以C 扩展模块形式接入 Python 的轻量级协程
- Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度
3. 使用方法
- 常用方法
方法 | 作用 |
---|---|
gevent.spawn(func,args/kwargs) | 创建一个协程对象, 第一个参数是函数名, 后面的参数是函数的位置参数或者关键字参数 |
[协程对象].join() | 等待协程对象的结束 |
gevent.joinall([对象 1, 对象 2]) | 等待多个协程对象的结束, 参数是一个列表, 放多个协程对象 |
[协程对象].value | 拿到协程对象的返回值 |
- 示例: 遇到 I/O 自动切换任务
import gevent
import time
def eat(name):
print(f"{name}正在吃东西 ")
gevent.sleep(3)
print(f"{name}吃完了 ")
return "i am eat"
def play(name):
print(f"{name}正在玩手机 ")
gevent.sleep(1)
print(f"{name}玩够了手机 ")
return "i am play"
start_time = time.time()
g1 = gevent.spawn(eat," 派大星 ")
g2 = gevent.spawn(play," 海绵宝宝 ")
# g1.join() # 等待协程对象 g1 结束
# g2.join() # 等待协程对象 g2 结束
gevent.joinall([g1,g2]) # 等待协程对象 g1 和 g2 结束
print(g1.value) # 获取协程对象 g1 的返回值
print(g2.value) # 获取协程对象 g2 的返回值
print(f" 用时:{time.time()-start_time}")
''' 输出
派大星正在吃东西
海绵宝宝正在玩手机
海绵宝宝玩够了手机
派大星吃完了
i am eat
i am play
用时:3.0330262184143066
'''
4.gevent 不支持识别其他操作的 I /O
-
像
time.sleep(2)
或者其他类型的I/O, gevent 模块无法识别, 只能识别gevent.sleep(2)
-
解决方法 : 使用猴子补丁让其能识别
from gevent import monkey;monkey.patch_all()
, 放在文件开头
from gevent import monkey;monkey.patch_all()
import gevent
import time
from threading import current_thread
def eat(name):
print(current_thread().name) # 查看该线程的名字
print(f"{name}正在吃东西 ")
time.sleep(3)
print(f"{name}吃完了 ")
return "i am eat"
def drink(name):
print(current_thread().name) # 查看该线程的名字
print(f"{name}正在喝汤 ")
time.sleep(2)
print(f"{name}把汤喝完了 ")
return "i am drink"
def play(name):
print(current_thread().name) # 查看该线程的名字
print(f"{name}正在玩手机 ")
time.sleep(1)
print(f"{name}玩够了手机 ")
return "i am play"
start_time = time.time()
g1 = gevent.spawn(eat," 派大星 ")
g2 = gevent.spawn(drink," 章鱼哥 ")
g3 = gevent.spawn(play," 海绵宝宝 ")
# g1.join() # 等待协程对象 g1 结束
# g2.join() # 等待协程对象 g2 结束
# g3.join() # 等待协程对象 g3 结束
gevent.joinall([g1,g2,g3]) # 等待协程对象 g1 和 g2 结束
print(g1.value) # 获取协程对象 g1 的返回值
print(g2.value) # 获取协程对象 g2 的返回值
print(g3.value) # 获取协程对象 g3 的返回值
print(f" 用时:{time.time()-start_time}")
''' 输出
Dummy-1
派大星正在吃东西
Dummy-2
章鱼哥正在喝汤
Dummy-3
海绵宝宝正在玩手机
海绵宝宝玩够了手机
章鱼哥把汤喝完了
派大星吃完了
i am eat
i am drink
i am play
用时:3.0230190753936768
'''
🍓# 可以查看到三个线程的名字 : Dummy-1、Dummy-2、Dummy-3、(都是假线程)
三. 使用协程编写 socket-TCP 程序示例
1. 服务端
from gevent import monkey;monkey.patch_all() # 添加猴子补丁
import gevent
from socket import *
# 建链接循环
def link(ip,port):
try:
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip,port))
server.listen(5)
except Exception as E:
print(E);return
while 1:
conn,addr = server.accept()
gevent.spawn(communication,conn)
# 建立链接成功之后开启一个协程任务进行通信循环
# 通信循环
def communication(conn):
while 1:
try:
data = conn.recv(1024)
if len(data) == 0:break
conn.send(data.upper())
except Exception as E:
print(E);break
conn.close()
if __name__ == '__main__':
g1 = gevent.spawn(link,"127.0.0.1",8090) # 先启动一个建立链接循环的协程任务
g1.join()
2. 客户端
- 客户端的编写可以是常规编写, 然后复制出多台客户端
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect(("127.0.0.1",8090))
while 1:
user = input(">>").strip()
if len(user) == 0:continue
client.send(user.encode("utf-8"))
data = client.recv(1024)
print(data.decode("utf-8"))
- 也可以使用多线程开启多个客户端, 不过公用同一个终端屏幕, 效果不明显
from threading import Thread,current_thread
from socket import *
def connection(ip,port,i):
client = socket(AF_INET,SOCK_STREAM)
client.connect((ip,port))
while 1:
client.send(f" 客户端编号:{i}, 名字:{current_thread().name}".encode("utf-8"))
data = client.recv(1024)
print(data.decode("utf-8"))
if __name__ == '__main__':
for i in range(10): # 多线程开启 10 个客户端进行与服务端的连接
t = Thread(target=connection,args=("127.0.0.1",8090,i))
t.start()